# Consumer 源码分析

作者:Ethan.Yang
博客:https://blog.ethanyang.cn (opens new window)


# 1 消费者启动流程

下面结合一个最简单的 Consumer Demo 来分析 RocketMQ 消费者启动全流程

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_consumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TopicTest", "*");

// 重点:这是启动入口
consumer.start();
1
2
3
4
5
6

image.png

# 1.1 DefaultMQPushConsumer.start()

客户端入口方法:

@Override
public void start() throws MQClientException {
    setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
    // ⭐ 核心:调用内部实现类 DefaultMQPushConsumerImpl.start()
    this.defaultMQPushConsumerImpl.start();

    // (可选)启动消息轨迹
    if (traceDispatcher != null) { traceDispatcher.start(...); }
}

1
2
3
4
5
6
7
8
9
10

真正的消费者启动逻辑是在:

DefaultMQPushConsumerImpl.start()

# 1.2 DefaultMQPushConsumerImpl.start()

该方法内容较多,我们按步骤进行拆解。

public synchronized void start() throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
            this.serviceState = ServiceState.START_FAILED;

            // 1. 校验配置
            this.checkConfig();

            // 2. 处理订阅关系(并自动生成重试 Topic)
            this.copySubscription();

            // 3. 获取或创建 MQClientInstance(JVM 共用)
            this.mQClientFactory =
                MQClientManager.getInstance().getOrCreateMQClientInstance(
                    this.defaultMQPushConsumer, this.rpcHook);

            // ------------------------ 消费者核心组件初始化 ------------------------

            // 4. 初始化负载均衡组件(RebalanceImpl)
            this.rebalanceImpl.setConsumerGroup(consumerGroup);
            this.rebalanceImpl.setMessageModel(messageModel);
            this.rebalanceImpl.setAllocateMessageQueueStrategy(strategy);
            this.rebalanceImpl.setmQClientFactory(mQClientFactory);

            // 5. 创建 PullAPIWrapper(所有模式底层都是“拉”消息)
            this.pullAPIWrapper = new PullAPIWrapper(
                mQClientFactory, consumerGroup, isUnitMode());
            this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

            // 6. 初始化消费进度 OffsetStore
            this.initOffsetStore();

            // 7. 加载本地消费进度
            this.offsetStore.load();

            // 8. 构建消费线程池(顺序 / 并发)
            this.initConsumeService();

            // 9. 启动消费线程池
            this.consumeMessageService.start();

            // 10. 注册消费者到 MQClientInstance
            boolean registerOK =
                mQClientFactory.registerConsumer(consumerGroup, this);

            if (!registerOK) { throw new MQClientException("group duplicate", null); }

            // 11. 启动 MQClientInstance(网络 + 定时任务 + rebalance)
            mQClientFactory.start();

            this.serviceState = ServiceState.RUNNING;
            break;
    }

    // -------------------- 启动后的初始化逻辑(非常重要) --------------------

    // 12. 更新订阅 Topic 的路由信息
    this.updateTopicSubscribeInfoWhenSubscriptionChanged();

    // 13. 检测 Broker 与 Client 状态
    this.mQClientFactory.checkClientInBroker();

    // 14. 向所有 Broker 发送心跳包
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

    // 15. 触发一次立即重平衡
    this.mQClientFactory.rebalanceImmediately();
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69

# 1.3 启动流程总结

1  校验配置
2  加工订阅关系
3  创建获取 MQClientInstance(JVM 共享)
4  初始化负载均衡组件
5  创建 PullAPIWrapper(底层拉模式)
6  创建 OffsetStore(本地 / 远程)
7  加载消费进度 offset
8  根据监听器创建消费线程(顺序/并发)
9  启动消费线程池
10 注册 consumer 到 MQClientInstance
11 启动 MQClientInstance(网络、定时任务、拉取线程)
12 更新订阅路由信息
13 检测客户端在 Broker 的状态
14 发送心跳到所有 Broker
15 立即执行一次重平衡
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 2 消费者模式

RocketMQ 的消费者有两种消息模型:

  • 集群消费(Clustering)
  • 广播消费(Broadcasting)

两者本质上的区别: 👉 消息是否只会被 Consumer Group 内的一个实例处理 👉 消费进度(Offset)存储在哪儿

# 2.1 集群消费

image.png

概念

集群消费意味着:

同一个 Consumer Group 内的多个 Consumer 实例平均分摊消息,一条消息只会被该组内的其中一个实例消费。

适用于:

  • 电商下单
  • 创建订单
  • 扣减库存
  • 任何希望只执行一次的业务逻辑

工作原理

假设:

  • Topic 有 3 条 MessageQueue(Q0 / Q1 / Q2)
  • Consumer Group 内有 3 个实例(C1 / C2 / C3)

那么 Rebalance 时会分配:

实例 分配到的队列
C1 Q0
C2 Q1
C3 Q2

Producer 发送消息时轮询所有 Q,因此消息会平均分散到各个队列,而消费者实例也会“平均消费”。

消费进度(Consumer Offset)存储方式

集群模式下:

Offset 持久化到 Broker(RemoteBrokerOffsetStore)

优势:

  • 多实例共享消费进度, 所以存储到Broker中
  • 实例重启不会重复消费
  • 消费者数量可水平扩容(自动分配队列)

# 2.2 广播消费

image.png

概念

广播消费意味着:

同一个 Consumer Group 内的每个实例都会消费所有消息。消息会广播给每个实例各一份。

适用于:

  • 配置刷新
  • 全量缓存更新
  • 各实例独立处理同一份数据的场景

工作原理

广播模式下:

  • 每个 Consumer 实例都会订阅 Topic 下所有 MessageQueue
  • 因此每条消息会被 Group 内“每个实例”接收一次

换句话说:

实例 能消费哪些 Q?
C1 Q0 / Q1 / Q2(全部)
C2 Q0 / Q1 / Q2(全部)
C3 Q0 / Q1 / Q2(全部)

消费进度(Consumer Offset)存储方式

广播模式下:

Offset 持久化到本地文件(LocalFileOffsetStore)

每个实例独立维护 offset,不相互影响。

# 3 Consumer负载均衡

# 3.1 集群模式

在集群消费模式下:

同一个 Consumer Group 内的多个 Consumer 实例需要“分摊消费”一个 Topic 的所有 MessageQueue(MQ)。 一条消息只会被 Group 内其中一个实例消费。

因此,Rebalance 的目标是:

  • 让每个 MessageQueue 只被一个实例消费
  • 让每个实例尽可能平均地分配到相同数量的 MQ

RocketMQ 采用“主动拉取”模式消费消息,因此每个 Consumer 在拉取时必须明确:

我属于消费组 X,我负责拉取 Topic T 的哪个 MessageQueue?
1

为了决定这一点,就需要 Rebalance。

# 3.1.1 何时触发 Rebalance?

以下情况都会触发:

  • Consumer 实例数量变化(上线 / 下线)
  • Topic 的队列数量变化
  • Broker 状态变化
  • 定时 Rebalance(默认每 20 秒)

Rebalance 的核心逻辑在 RebalanceService.run() 中持续执行。

# 3.1.2 AllocateMessageQueueAveragely

算法说明:

** 平均分配算法, 将 MQ 列表平均切分给各个 Consumer 实例。**

例如:

Topic 有 MQ Consumer 实例 分配结果
8 条 3 个 C1: 0,1,2 ;C2:3,4,5;C3:6,7

特点:

  • 完全平均
  • 顺序连续
  • 最常用的分配算法(默认)

# 3.1.3 AllocateMessageQueueAveragelyByCircle

同样是平均分配,但采用“轮询取队列”的方式:

示例(MQ=8,实例=3):

实例 分配到的 MQ
C1 0,3,6
C2 1,4,7
C3 2,5

适用于“避免相邻队列集中在同一实例”的场景。

# 3.1.4 为什么一个 Queue 必须只分给一个实例?

因为 RocketMQ 是 Consumer 主动拉取消息(Pull 模式):

  • 如果 Q0 同时分给 C1 和 C2
  • 那两者都会从 Q0 拉取消息
  • 同一条消息会被两个实例同时消费 → 完全违背集群消费语义

所以设计上保证:

MessageQueue : Consumer 是 N : 1
1

即一个MessageQueue只能被一个Consumer负责, 但一个 Consumer 实例可以负责多个不同的 Queue。

# 3.1.5 Queue 数量与 Consumer 数量的关系

如果 Consumer 数量 > Queue 数量,多出来的 Consumer 实例将无法分到队列,也就无法消费任何消息。

Example:

MQ 数量 Consumer 实例数 有效消费实例 空闲实例
4 2 全部可消费 0
4 4 全部可消费 0
4 6 4 个可消费 2 个空闲(没队列可拉)

因此:

推荐:Queue 数量 ≥ Consumer 实例数

Topic 在创建时应规划合适的 Queue 个数(常见为 4 / 8 / 16 / 32)。

# 3.2 广播模式

广播消费模式下:

Consumer Group 中的每个实例必须消费所有队列的全部消息。 不存在“分摊”概念。

广播模式的特点:

  • 每个实例分配 全部 MessageQueue
  • 每条消息被 Group 内所有实例消费一次
  • Offset 保存在本地,不共享

# 4 并发消费流程

一般我们在消费时使用回调函数的方式,使用得最多的是并发消费,消费者客户端代码如下:

// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                    ConsumeConcurrentlyContext context) {
        try {
            for (MessageExt msg : msgs) {
                String topic = msg.getTopic();
                String msgBody = new String(msg.getBody(), "utf-8");
                String tags = msg.getTags();
                Thread.sleep(1000);
                System.out.println("收到消息:" + " topic :" + topic
                        + " ,tags : " + tags
                        + " ,msg : " + msgBody);
            }
        } catch (Exception e) {
            e.printStackTrace();
            // 告诉 Broker 稍后重投
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

回调只是最上层入口,真正的消费流程是由 RocketMQ 客户端在内部帮我们完成的,大致可以拆成:

  1. 获取 Topic 的路由信息(路由发现)
  2. 获取当前 Group 的 Consumer 列表(做队列负载均衡)
  3. 为当前 Consumer 分配 MessageQueue,并计算每个 Queue 的消费起点 Offset
  4. 基于 Pull 模式拉取消息(Push 是“伪推”,底层仍然是拉)
  5. 投递给消费线程池并回调 MessageListenerConcurrently
  6. 定期持久化消费进度 Offset
  7. Consumer 关闭 / 注销时的清理

下面分步骤结合源码看一下。

# 4.1 获取 Topic 配置信息(路由信息)

在消费者启动之后,第一步都是从 NameServer 中获取 Topic 相关的路由信息(有哪些 Broker、写队列数、读队列数等)。

这一步是在 MQClientInstance.start() 里完成初始化 + 定时任务的:

public void start() throws MQClientException {
    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // 如果未显式配置 namesrvAddr,则先拉取 NameServer 地址列表
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // 启动 Remoting 通道(与 NameServer / Broker 建立 Netty 连接)
                this.mQClientAPIImpl.start();
                // 启动各种定时任务(路由刷新、offset 持久化等)
                this.startScheduledTask();
                // 启动拉消息服务
                this.pullMessageService.start();
                // 启动负载均衡服务
                this.rebalanceService.start();
                ...
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

startScheduledTask() 里有两个关键定时任务与路由相关:

private void startScheduledTask() {
    // 1)定时拉取 NameServer 地址列表(2 分钟一次)
    if (null == this.clientConfig.getNamesrvAddr()) {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                } catch (Exception e) {
                    log.error("ScheduledTask fetchNameServerAddr exception", e);
                }
            }
        }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
    }

    // 2)30s 一次刷新 Topic 路由信息
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                MQClientInstance.this.updateTopicRouteInfoFromNameServer();
            } catch (Exception e) {
                log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
            }
        }
    }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

updateTopicRouteInfoFromNameServer() 的逻辑是: 把当前所有 Producer / Consumer 关注的 Topic 收集起来,逐个向 NameServer 拉取路由信息:

public void updateTopicRouteInfoFromNameServer() {
    Set<String> topicList = new HashSet<>();

    // Consumer 订阅的 Topic
    {
        Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
        while (it.hasNext()) {
            MQConsumerInner impl = it.next().getValue();
            if (impl != null) {
                Set<SubscriptionData> subList = impl.subscriptions();
                if (subList != null) {
                    for (SubscriptionData subData : subList) {
                        topicList.add(subData.getTopic());
                    }
                }
            }
        }
    }

    // Producer 发布的 Topic
    {
        Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
        while (it.hasNext()) {
            MQProducerInner impl = it.next().getValue();
            if (impl != null) {
                topicList.addAll(impl.getPublishTopicList());
            }
        }
    }

    for (String topic : topicList) {
        this.updateTopicRouteInfoFromNameServer(topic);
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

真正向 NameServer 发送请求是在 MQClientAPIImpl.getTopicRouteInfoFromNameServer 里:

public TopicRouteData getTopicRouteInfoFromNameServer(final String topic,
                                                      final long timeoutMillis,
                                                      boolean allowTopicNotExist)
        throws MQClientException, InterruptedException,
               RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
    GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
    requestHeader.setTopic(topic);
    // 生产者/消费者向 NameServer 获取路由信息
    RemotingCommand request = RemotingCommand.createRequestCommand(
            RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);

    RemotingCommand response =
            this.remotingClient.invokeSync(null, request, timeoutMillis);
    ...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

注意:路由刷新是定时任务,并非“实时感知” Broker 状态,所以 Broker 宕机、缩容等情况,客户端在一个短时间窗口内会有感知延迟,需要在业务上做好重试 / 容错。

# 4.2 获取 Group 的 ConsumerList(为负载均衡做准备)

消费者启动后,MQClientInstance 会启动一个专门的负载均衡服务RebalanceService,间隔一段时间做一次“分配队列”的工作。

MQClientInstance.start()里已经看到:

// 12.2 开启拉消息服务(消费者:线程)
this.pullMessageService.start();
// 12.3 开启负载均衡服务(消费者:线程)
this.rebalanceService.start();
1
2
3
4

RebalanceService 的核心逻辑很简单:每隔 20s 调用一次 mqClientFactory.doRebalance()

public class RebalanceService extends ServiceThread {
    private static long waitInterval =
        Long.parseLong(System.getProperty(
            "rocketmq.client.rebalance.waitInterval", "20000"));

    private final MQClientInstance mqClientFactory;

    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");
        while (!this.isStopped()) {
            this.waitForRunning(waitInterval);
            this.mqClientFactory.doRebalance();
        }
        log.info(this.getServiceName() + " service end");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

MQClientInstance.doRebalance() 会遍历当前进程内所有 Consumer:

public void doRebalance() {
    for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
        MQConsumerInner impl = entry.getValue();
        if (impl != null) {
            try {
                impl.doRebalance();
            } catch (Throwable e) {
                log.error("doRebalance exception", e);
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12

对于 DefaultMQPushConsumerImpl,会委托给 RebalanceImpl

@Override
public void doRebalance() {
    if (!this.pause) {
        this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
    }
}
1
2
3
4
5
6

RebalanceImpl.doRebalance() 中,会遍历当前 Consumer 订阅的所有 Topic,并按 Topic 做 Rebalance:

public void doRebalance(final boolean isOrder) {
    Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
    if (subTable != null) {
        for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
            final String topic = entry.getKey();
            try {
                this.rebalanceByTopic(topic, isOrder);
            } catch (Throwable e) {
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("rebalanceByTopic Exception", e);
                }
            }
        }
    }
    this.truncateMessageQueueNotMyTopic();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

集群 vs 广播:这里开始分叉

广播模式(BROADCASTING): 不需要从 Broker 获取 Consumer 列表,也没有队列级别负载均衡 —— 每个 Consumer 都消费 Topic 下所有队列。

case BROADCASTING: {
    Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
    if (mqSet != null) {
        boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
        if (changed) {
            this.messageQueueChanged(topic, mqSet, mqSet);
            log.info("messageQueueChanged {} {} {} {}",
                consumerGroup, topic, mqSet, mqSet);
        }
    } else {
        log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
    }
    break;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

集群模式(CLUSTERING): 需要先从 Broker 获取当前 Group 的所有 ConsumerId 列表,然后通过分配策略 AllocateMessageQueueStrategy 做队列分配。

case CLUSTERING: {
    Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
    // 从 Broker 拉取该 Group 下所有 ConsumerId
    List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
    ...
}
1
2
3
4
5
6

findConsumerIdList 会先通过 Topic 找到某个 Broker 地址,再请求该 Broker 获取 Consumer 列表:

public List<String> findConsumerIdList(final String topic, final String group) {
    String brokerAddr = this.findBrokerAddrByTopic(topic);
    if (null == brokerAddr) {
        this.updateTopicRouteInfoFromNameServer(topic);
        brokerAddr = this.findBrokerAddrByTopic(topic);
    }

    if (null != brokerAddr) {
        try {
            return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, 3000);
        } catch (Exception e) {
            log.warn("getConsumerIdListByGroup exception, " + brokerAddr + " " + group, e);
        }
    }
    return null;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

生成请求的代码:

public List<String> getConsumerIdListByGroup(final String addr,
                                             final String consumerGroup,
                                             final long timeoutMillis)
        throws RemotingConnectException, RemotingSendRequestException,
               RemotingTimeoutException, MQBrokerException, InterruptedException {
    GetConsumerListByGroupRequestHeader requestHeader = new GetConsumerListByGroupRequestHeader();
    requestHeader.setConsumerGroup(consumerGroup);
    // 获取 Group 对应的 ConsumerId 列表
    RemotingCommand request = RemotingCommand.createRequestCommand(
            RequestCode.GET_CONSUMER_LIST_BY_GROUP, requestHeader);
    ...
}
1
2
3
4
5
6
7
8
9
10
11
12

到这里,我们已经:

  • 拿到了 Topic 的路由信息(有哪些 Queue)
  • 拿到了某个 Group 下所有 ConsumerId(集群模式)
  • 通过 Rebalance 做好了“某个 Consumer 分到哪些 Queue”的决定

接下来就进入 “为分配到的每个 Queue 计算消费起点 Offset、并开始拉消息” 的流程。

# 4.3 获取 Queue 的消费 Offset

队列分配完成后,需要为新分配到的 MessageQueue 计算消费起始 Offset。这一步顺带把“拉消息请求”放入拉取队列中。

核心逻辑在 RebalanceImpl.updateProcessQueueTableInRebalance 中,这里简化看关键步骤:

  1. 找到当前应该分配给本 Consumer 的队列集合 mqSet(Rebalance 得来的)
  2. 和本地 processQueueTable 做对比:
    • 已经不属于本 Consumer 的队列 → 标记为 dropped,停止拉取
    • 新分配到的队列 → 计算 Offset,并构造 PullRequest,交给 PullMessageService

伪代码结构大致如下(不同版本源码可能略有出入):

boolean updateProcessQueueTableInRebalance(final String topic,
                                           final Set<MessageQueue> mqSet,
                                           final boolean isOrder) {
    // this.processQueueTable: <MessageQueue, ProcessQueue>
    // 新老对比,找出需要移除和新增的 Queue
    // 1. 移除不再属于当前 Consumer 的队列
    ...
    // 2. 为新分配到的队列创建 ProcessQueue,并计算拉取起点 Offset
    for (MessageQueue mq : mqSet) {
        if (!this.processQueueTable.containsKey(mq)) {
            long nextOffset = this.computePullFromWhere(mq);
            if (nextOffset >= 0) {
                ProcessQueue pq = new ProcessQueue();
                this.processQueueTable.put(mq, pq);

                PullRequest pullRequest = new PullRequest();
                pullRequest.setConsumerGroup(this.consumerGroup);
                pullRequest.setMessageQueue(mq);
                pullRequest.setProcessQueue(pq);
                pullRequest.setNextOffset(nextOffset);

                // 提交到 PullMessageService
                this.dispatchPullRequest(pullRequest);
            } else {
                log.warn("doRebalance, {}, computePullFromWhere failed, mq={}",
                         consumerGroup, mq);
            }
        }
    }
    ...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

computePullFromWhere(mq) 就是真正读取 Offset 的地方

  • 集群模式:从 Broker 存储的 offset 中读取(RemoteBrokerOffsetStore
  • 广播模式:从本地文件中读取(LocalFileOffsetStore

并根据 consumeFromWhere 策略(CONSUME_FROM_LAST_OFFSET / FIRST_OFFSET / TIMESTAMP)决定:

public long computePullFromWhere(final MessageQueue mq) {
    long result = -1;
    switch (this.consumeFromWhere) {
        case CONSUME_FROM_LAST_OFFSET:
            // 优先使用已经存在的 offset;否则从队列尾部开始
            ...
        case CONSUME_FROM_FIRST_OFFSET:
            // 没有历史 offset 时,从头开始
            ...
        case CONSUME_FROM_TIMESTAMP:
            // 根据时间戳在队列中查找 offset
            ...
    }
    return result;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

到这里为止:

每个分配给当前 Consumer 的 MessageQueue 都有了一个起始 Offset,并且生成了对应的 PullRequest,等待拉消息服务去执行。

# 4.4 拉取 Queue 的消息

PullMessageService 是一个专门的线程,不断从内部阻塞队列中取出 PullRequest,然后调用 DefaultMQPushConsumerImpl.pullMessage() 去拉消息。

# 4.4.1 PullMessageService 主循环

public class PullMessageService extends ServiceThread {

    private final LinkedBlockingQueue<PullRequest> pullRequestQueue =
            new LinkedBlockingQueue<>();

    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            try {
                // 阻塞获取 PullRequest
                PullRequest pullRequest = this.pullRequestQueue.take();
                if (pullRequest != null) {
                    this.pullMessage(pullRequest);
                }
            } catch (InterruptedException ignored) {
            } catch (Exception e) {
                log.error("PullMessageService run error", e);
            }
        }

        log.info(this.getServiceName() + " service end");
    }

    public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
        // 延迟投递的逻辑,略
    }

    public void executePullRequestImmediately(final PullRequest pullRequest) {
        this.pullRequestQueue.put(pullRequest);
    }

    private void pullMessage(final PullRequest pullRequest) {
        final MQConsumerInner consumer = this.mqClientFactory.selectConsumer(pullRequest.getConsumerGroup());
        if (consumer != null) {
            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
            impl.pullMessage(pullRequest);
        } else {
            log.warn("No matched consumer for the PullRequest: {}", pullRequest);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

# 4.4.2 DefaultMQPushConsumerImpl.pullMessage

真正发起“拉消息请求”的地方:

public void pullMessage(final PullRequest pullRequest) {
    final ProcessQueue processQueue = pullRequest.getProcessQueue();
    final MessageQueue messageQueue = pullRequest.getMessageQueue();

    if (processQueue.isDropped()) {
        log.info("the pull request[{}] is dropped.", pullRequest.toString());
        return;
    }

    // 流控、暂停等校验略...

    final long offset = pullRequest.getNextOffset();

    // 构建拉取请求,调用 PullAPIWrapper
    this.pullAPIWrapper.pullKernelImpl(
            messageQueue,
            null,                   // subscriptionData 的表达式
            this.defaultMQPushConsumer.getConsumerGroup(),
            this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(),
            offset,
            this.defaultMQPushConsumer.getPullBatchSize(),
            false,
            timeoutMillis,
            COMMUNICATION_MODE_ASYNC,
            new PullCallback() {
                @Override
                public void onSuccess(PullResult pullResult) {
                    // 拉取结果回调
                    pullMessageCallback(pullRequest, pullResult);
                }

                @Override
                public void onException(Throwable e) {
                    // 异常时,稍后再拉
                    executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
                }
            }
    );
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

在回调 pullMessageCallback 中会:

  1. 根据 PullResult 状态处理(FOUND / NO_NEW_MSG / NO_MATCHED_MSG 等)
  2. 把拉取到的消息存入 ProcessQueue
  3. 更新 PullRequestnextOffset
  4. 提交消费任务到消费线程池(并发消费就是 ConsumeMessageConcurrentlyService

示意(伪代码):

private void pullMessageCallback(final PullRequest pullRequest,
                                 final PullResult pullResult) {
    switch (pullResult.getPullStatus()) {
        case FOUND:
            // 把消息放入 ProcessQueue
            int msgCount = processQueue.putMessage(pullResult.getMsgFoundList());
            // 提交消费任务
            consumeMessageService.submitConsumeRequest(
                    pullResult.getMsgFoundList(),
                    processQueue,
                    pullRequest.getMessageQueue(),
                    true);
            // 更新下次拉取的 offset
            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
            // 继续拉下一批
            executePullRequestImmediately(pullRequest);
            break;
        case NO_NEW_MSG:
        case NO_MATCHED_MSG:
            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
            executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_NO_NEW_MSG);
            break;
        ...
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# 4.4.3 并发消费:ConsumeMessageConcurrentlyService

并发消费对应的是 ConsumeMessageConcurrentlyService,内部用线程池来并行执行消费回调:

public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {

    private final ThreadPoolExecutor consumeExecutor;

    @Override
    public void submitConsumeRequest(
            final List<MessageExt> msgs,
            final ProcessQueue processQueue,
            final MessageQueue messageQueue,
            final boolean dispatchToConsume) {

        // 略去拆分批次逻辑
        ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
        this.consumeExecutor.submit(consumeRequest);
    }

    class ConsumeRequest implements Runnable {
        private final List<MessageExt> msgs;
        private final ProcessQueue processQueue;
        private final MessageQueue messageQueue;

        @Override
        public void run() {
            // 回调用户注册的 MessageListenerConcurrently
            ConsumeConcurrentlyStatus status =
                    messageListener.consumeMessage(msgs, context);

            // 根据返回结果处理:成功 / 重投
            processConsumeResult(status, context, this);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

这就是 MessageListenerConcurrently 被调用的地方。

# 4.5 更新 Queue 的消费 Offset

偏移量的更新有两个层面:

  1. 内存中的 offset:每次消费结果处理时更新
  2. 持久化 offset:定期把内存中的 offset 刷新到 Broker(集群模式)或本地文件(广播模式)

# 4.5.1 消费结果后的 offset 更新

仍然在 ConsumeMessageConcurrentlyService.processConsumeResult 中处理:

private void processConsumeResult(
        final ConsumeConcurrentlyStatus status,
        final ConsumeConcurrentlyContext context,
        final ConsumeRequest consumeRequest) {

    switch (status) {
        case CONSUME_SUCCESS:
            // 成功消费,计算本次消费的最大 offset
            long offset = consumeRequest.getOffset();
            // 更新本地 offsetStore 中该 Queue 的内存 offset
            this.defaultMQPushConsumerImpl.getOffsetStore()
                    .updateOffset(consumeRequest.getMessageQueue(), offset, true);
            break;

        case RECONSUME_LATER:
            // 失败则发送重投请求(重试队列),offset 通常不前移或由重试逻辑单独处理
            ...
            break;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

这里的 updateOffset 一般只更新到内存结构(例如 ConcurrentMap),不一定立刻刷到远端。

# 4.5.2 定时持久化 offset

真正的持久化是在 MQClientInstance.startScheduledTask() 中的另一个定时任务完成的:

private void startScheduledTask() {
    ...
    // 持久化消费进度:默认每 5s 一次
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                MQClientInstance.this.persistAllConsumerOffset();
            } catch (Exception e) {
                log.error("ScheduledTask persistAllConsumerOffset exception", e);
            }
        }
    }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

persistAllConsumerOffset() 会遍历所有 Consumer,调用各自的 offsetStore.persistAll

  • 集群模式RemoteBrokerOffsetStore,offset 持久化到 Broker
  • 广播模式LocalFileOffsetStore,offset 持久化到本地文件

# 4.6 注销Consumer

当应用关闭或者你主动调用 consumer.shutdown() 时,需要:

  1. 停止拉消息服务、负载均衡服务
  2. 持久化当前 offset
  3. MQClientInstanceconsumerTable 中移除
  4. 向 Broker 发送 UNREGISTER 消费者的请求

DefaultMQPushConsumerImpl.shutdown() 中:

public void shutdown() {
    this.defaultMQPushConsumerImpl.shutdown();
}

public void shutdown() {
    synchronized (this) {
        switch (this.serviceState) {
            case RUNNING:
                this.serviceState = ServiceState.SHUTDOWN_ALREADY;

                // 1. 持久化当前消费者的 offset
                this.persistConsumerOffset();

                // 2. 从 MQClientInstance 中注销当前 Consumer
                this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());

                // 3. 关闭内部各种服务(pullMessageService、rebalanceService 等)
                this.mQClientFactory.shutdown();
                ...
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

unregisterConsumer 内部会发送 UNREGISTER_CLIENT 给 Broker,Broker 侧会从内存结构中移除对应的 Consumer 信息(包括心跳信息、负载均衡信息等)。

# 5 顺序消费流程

顺序消费示例代码:

consumer.registerMessageListener(new MessageListenerOrderly() {

    Random random = new Random();

    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
                                               ConsumeOrderlyContext context) {
        // 开启自动提交
        context.setAutoCommit(true);

        for (MessageExt msg : msgs) {
            // 可以看到:同一个 queue 只会被一个消费线程处理
            System.out.println("consumeThread=" + Thread.currentThread().getName()
                    + ", queueId=" + msg.getQueueId()
                    + ", content=" + new String(msg.getBody()));
        }

        try {
            // 模拟业务处理耗时
            TimeUnit.SECONDS.sleep(random.nextInt(10));
        } catch (Exception e) {
            e.printStackTrace();
        }

        return ConsumeOrderlyStatus.SUCCESS;
    }
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

整体骨架和并发消费是一样的:

还是:Rebalance 分配队列 → Pull 拉消息 → 消费线程池回调监听器。

唯一核心差异:*顺序消费多了一套“队列加锁 + 单线程串行消费”的机制,确保*同一个队列同一时刻只被一个消费者线程消费,从而保证队列内消息顺序**。

下面就只强调顺序消费比并发消费多出来的那一块:“锁队列”的实现流程

# 5.1 顺序消费服务启动:周期性锁队列

顺序消费对应的实现类是 ConsumeMessageOrderlyService

start() 时,如果是集群模式,会启动一个定时任务,定期去 Broker 续锁:

public void start() {
    if (MessageModel.CLUSTERING.equals(
            ConsumeMessageOrderlyService.this
                .defaultMQPushConsumerImpl.messageModel())) {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                ConsumeMessageOrderlyService.this.lockMQPeriodically();
            }
        }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12

默认:每 20 秒执行一次 lockMQPeriodically,锁有效期约 60 秒,即:定期续约队列锁

lockMQPeriodically() 很简单,直接调用 RebalanceImpl:

public synchronized void lockMQPeriodically() {
    if (!this.stopped) {
        this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
    }
}
1
2
3
4
5

# 5.2 客户端按 Broker 批量锁队列

RebalanceImpl.lockAll() 会先按 Broker 维度,把当前进程持有的 MessageQueue 分组,然后对每个 Broker 批量发送锁请求:

public void lockAll() {
    HashMap<String, Set<MessageQueue>> brokerMqs =
            this.buildProcessQueueTableByBrokerName();

    for (Entry<String, Set<MessageQueue>> entry : brokerMqs.entrySet()) {
        final String brokerName = entry.getKey();
        final Set<MessageQueue> mqs = entry.getValue();
        if (mqs.isEmpty()) continue;

        FindBrokerResult findBrokerResult =
                this.mQClientFactory.findBrokerAddressInSubscribe(
                        brokerName, MixAll.MASTER_ID, true);
        if (findBrokerResult != null) {
            LockBatchRequestBody requestBody = new LockBatchRequestBody();
            requestBody.setConsumerGroup(this.consumerGroup);
            requestBody.setClientId(this.mQClientFactory.getClientId());
            requestBody.setMqSet(mqs);

            try {
                Set<MessageQueue> lockOKMQSet =
                        this.mQClientFactory.getMQClientAPIImpl()
                                .lockBatchMQ(findBrokerResult.getBrokerAddr(),
                                        requestBody, 1000);
                // 根据返回结果,把成功锁住的队列标记为 locked
                ...
            } catch (Exception e) {
                ...
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

发送锁请求的真正实现:

public Set<MessageQueue> lockBatchMQ(final String addr,
                                     final LockBatchRequestBody requestBody,
                                     final long timeoutMillis)
        throws RemotingException, MQBrokerException, InterruptedException {
    RemotingCommand request =
            RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);
    request.setBody(requestBody.encode());

    RemotingCommand response = this.remotingClient.invokeSync(
            MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
            request, timeoutMillis);

    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            LockBatchResponseBody responseBody =
                    LockBatchResponseBody.decode(response.getBody(), LockBatchResponseBody.class);
            // Broker 返回成功加锁的 MessageQueue 集合
            return responseBody.getLockOKMQSet();
        }
        default:
            break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

关键点:

  • 锁是 Broker 端维护的(哪个 Group 的哪个 clientId 拿到了某个队列的锁);
  • 客户端只是在本地 ProcessQueue 上标记“是否已锁定”,没有抢到锁的队列不会真正开始消费。

# 5.3 消费端如何利用“队列锁”保证顺序?

顺序消费下,整体流程还是:

PullMessageService 拉消息 → 放入 ProcessQueue → 丢给 ConsumeMessageOrderlyService 的线程池。

区别在于:

  1. 只有拿到锁的队列才会被拉取 + 提交消费任务;
  2. 同一个队列只会由一个消费线程串行执行;
  3. 如果锁失效 / rebalance 导致队列被分配给别的实例,当前实例会把 ProcessQueue 标记为 dropped,此后不再拉取或消费。

所以顺序保证是按队列维度的:

  • 同一个 MessageQueue:加锁 + 单线程串行消费 → 有序
  • 不同 MessageQueue:互相独立,可以并行消费 → 全局无序,但“分区内有序”

这也就是常说的:

RocketMQ 只能保证“分区(队列)内顺序”,而不能保证“Topic 全局顺序”。

# 5.4 关闭顺序消费:释放队列锁

在顺序消费服务 ConsumeMessageOrderlyService.shutdown() 中,会优雅关闭线程池,并在集群模式下释放所有队列锁:

public void shutdown(long awaitTerminateMillis) {
    this.stopped = true;
    this.scheduledExecutorService.shutdown();
    ThreadUtils.shutdownGracefully(this.consumeExecutor,
            awaitTerminateMillis, TimeUnit.MILLISECONDS);

    if (MessageModel.CLUSTERING.equals(
            this.defaultMQPushConsumerImpl.messageModel())) {
        this.unlockAllMQ();
    }
}
1
2
3
4
5
6
7
8
9
10
11

解锁逻辑同样委托给 RebalanceImpl:

public synchronized void unlockAllMQ() {
    this.defaultMQPushConsumerImpl.getRebalanceImpl().unlockAll(false);
}
1
2
3

unlockAll 和加锁类似,按 Broker 分组,然后批量发送 UNLOCK 请求:

public void unlockAll(final boolean oneway) {
    HashMap<String, Set<MessageQueue>> brokerMqs =
            this.buildProcessQueueTableByBrokerName();

    for (Entry<String, Set<MessageQueue>> entry : brokerMqs.entrySet()) {
        final String brokerName = entry.getKey();
        final Set<MessageQueue> mqs = entry.getValue();
        if (mqs.isEmpty()) continue;

        FindBrokerResult findBrokerResult =
                this.mQClientFactory.findBrokerAddressInSubscribe(
                        brokerName, MixAll.MASTER_ID, true);

        if (findBrokerResult != null) {
            UnlockBatchRequestBody requestBody = new UnlockBatchRequestBody();
            requestBody.setConsumerGroup(this.consumerGroup);
            requestBody.setClientId(this.mQClientFactory.getClientId());
            requestBody.setMqSet(mqs);

            this.mQClientFactory.getMQClientAPIImpl()
                .unlockBatchMQ(findBrokerResult.getBrokerAddr(),
                               requestBody, 1000, oneway);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

unlockBatchMQ 内部就是发一个 UNLOCK_BATCH_MQ 请求给 Broker,Broker 端删除对应的锁记录。

# 6 消费卡死

在 RocketMQ 顺序消费中,最容易让人产生“卡死”、”不动了“的现象,主要有两类原因:

  • 队列锁(Queue Lock)被占用 / 未释放
  • Broker 主从切换(Master 挂了导致锁丢失)

下面把逻辑讲完整。

# 6.1 队列锁导致的“消费卡死”

顺序消费和并发消费最大的区别是:

同一个 MessageQueue 必须严格串行消费。 为此,RocketMQ 会在 Broker 端给队列加一个“分布式锁”。

流程如下:

  1. 消费者 A 请求消费 Queue-0 → Broker 给它加锁;
  2. 消费者 A 一直持有该队列的锁,其他消费者无法消费 Queue-0;
  3. 锁的默认有效期是 60 秒,消费者客户端每 20 秒续约一次;

如果消费者 A 在消费中途挂掉了,会发生什么?

  • Broker 会认为 A 还持有锁(因为锁还没过期)
  • 在锁的 60 秒 TTL 内,其他消费者 B 无法获得该队列的锁

结果就是:

B 收到 Queue-0,但拉消息却一直失败 → 会表现为“消费卡死/不动了”。

直到锁在 Broker 上自然过期,另一个消费者才能重新获得锁。

简单总结

  • 顺序消费 = Broker 端队列锁
  • Consumer 异常退出 → 锁无法立即释放,只能等 TTL 过期
  • 在此期间,队列无法被其他实例消费 → “卡死”

这是顺序消费的常见现象,并非 Bug,而是锁的特性。

# 6.2 Broker 主从切换导致的“卡死”

RocketMQ 顺序消息要求:

  • 加锁一定在 Master 上进行
  • 消费失败也一定走 Master 处理

原因:锁记录只在 Master 内存中维护,不会同步到 Slave。

如果 Master 挂了,会发生什么?

假设如下场景:

  1. Consumer A 已经从 Master 获得了 Queue-0 的锁;
  2. Master 突然宕机;
  3. Consumer B 想切到 Slave 继续消费 Queue-0;

但问题是:

Slave 上没有锁记录,也没有锁系统。 Slave 模式下,不提供加锁能力。

所以会出现:

  • Consumer B 发起锁请求 → 无响应 / 失败
  • Consumer B 拉不到消息 → 消费能力暂停
  • 直到 Master 重新恢复(或 HA 切主成功)

因此顺序消息在主从切换时会发生:

短暂的不可消费状态(类似“挂起”)→ 表现为消费卡住不动。

# 6.3 总结与设计原因

场景 根本原因 表现
Consumer 异常退出 Broker 的队列锁 TTL(60s)未过期 其他消费者无法接管该队列,消费暂停 60s
Master 挂掉,Slave 接管 加锁逻辑只在 Master,中途无法加锁 顺序队列在主从切换窗口不可消费

核心原因: 顺序消息的“顺序性大于可用性”。

要保证一条队列上的消息严格顺序,就必须做到:

  • 一个队列只能由一个消费线程消费
  • 任意时刻不能有两个消费者同时处理它
  • 锁要能防止“脏接管”(即前一个实例还没释放,别人不能抢)

也就是说:

顺序消费的强一致性必然牺牲部分可用性。

# 6.4 解决方案

消费卡死原因不在于RocketMQ架构设计, 而在于消费端或者集群节点不稳定

  1. 手动控制 lockIntervallockExpire

    RocketMQ 客户端可调节定时续锁频率(20s)与锁 TTL(60s),但一般不推荐过度修改。

  2. 避免业务阻塞

  3. 尽量减少使用顺序消费的业务场景

# 7 启动之后较长时间才消费

启动 RocketMQ 消费者后,常会发现需要一段时间(几秒甚至几十秒)才能正式开始消费。主要原因有两类:

  1. 并发消费:启动流程步骤多、成本高

    PushConsumer 启动后,需要完成大量初始化动作:

    1. 从 NameServer 拉取 Topic 路由
    2. 建立与 Broker 的网络连接
    3. 启动拉消息线程(PullMessageService)
    4. 启动负载均衡线程(RebalanceService)
    5. 根据 Group 做队列负载均衡
    6. 为每个队列计算 offset
    7. 构造并提交首个 PullRequest
    8. 消费线程池初始化

    尤其是:

    • 订阅的 Topic 多
    • 队列数量多
    • Consumer 实例数量多(需要协调 Rebalance)

    时,这一整套过程会更慢,所以消费者会出现启动后延迟几秒开始消费的现象。

  2. **顺序消费:队列锁导致的“冷启动等待” **

    顺序消费额外依赖Broker 端的队列锁

    如果之前的 Consumer 异常退出,Broker 上的锁不会立刻释放,需要等待:

    默认 60 秒的锁有效期自动过期

    因此,新启动的 Consumer 想要消费这个队列时:

    • 会先尝试加锁
    • 如果上一实例的锁还没过期 → 加锁失败
    • 只能等待下一次续锁时机(20s)+ 锁过期(最长 60s)

    最终表现就是:

    顺序消息消费者启动后要等几十秒才能开始消费,看起来像“卡住了”。